package com.matrix.async.core;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;

import com.matrix.async.bean.AsyncConfigurationBean;
import com.matrix.async.dao.AsyncConfigurationDao;
import com.matrix.async.service.AsyncTaskService;
import com.matrix.core.tools.WebUtil;

public class Producer extends WorkThread {
	
	private Logger log = Logger.getLogger(WorkThread.class);
	private AsyncTaskService asyncTaskService;

	private final APPQueue taskQueue;

	private TaskHandler taskHandle = null;

	// 批量获取任务的大小
	private int batchSize = 10;

	@SuppressWarnings("unused")
	private RetryPolicy retryPolicy = null;

	private Map<?, ?> rushHour = new HashMap<>();

	private boolean randomInterval = true;

	private int interval = 30;

	private int rushHourInterval = 1;
	
	AsyncConfigurationBean selectBytaskType;
	
	AsyncConfigurationDao asyncConfigurationDao;

	Producer(String taskType, APPQueue taskQueue, TaskHandler taskHandler, AsyncTaskService asyncTaskService,
			ThreadFactory threadFactory) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
		asyncConfigurationDao = (AsyncConfigurationDao) WebUtil.getBean("asyncConfigurationDao");
		this.name = "Producer [" + taskType + "]";
		this.taskType = taskType;
		this.taskQueue = taskQueue;
		this.taskHandle = taskHandler;
		this.asyncTaskService = asyncTaskService;
		this.thread = (threadFactory != null ? threadFactory.newThread(name, this)
				: WorkThread.defaultThreadFactory().newThread(name, this));
		log.info("生产者被创建=" + this.name);

	}

	/**
	 * 生产者工作方法
	 */
	protected void work() {
//			selectBytaskType = asyncConfigurationDao.selectBytaskType(taskType);
//			if(selectBytaskType.getAppFactoryBean().equals("false")){
//				runState = SUSPEND;
//				return;
//			}
		List<Task> tasks = getTask();
		if (tasks.size() > 0) {
			taskQueue.put(tasks);
			log.info("新生产任务"+tasks.size()+"个");
		} else {
			// 如果数据库中任务不够，则等待一段时间
			try {

				if (isRushHour()) {
					Thread.sleep(rushHourInterval * 1000);
				} else {
					if (randomInterval) {
						Thread.sleep((int) (Math.random() * interval) * 1000);
					} else {
						Thread.sleep(interval * 1000);
					}
				}

			} catch (InterruptedException e) {
				log.error(e.getMessage(), e);
			}
		}

	}

	/**
	 * 是否处于高分期
	 * 
	 * @return
	 */
	private boolean isRushHour() {
		return rushHour.get(Tools.getDateString(new Date(), "HH")) != null;
	}

	private List<Task> getTask() {
		try {
			// 查询待处理任务包id
			List<String> list = asyncTaskService.getTaskPackageIdList(taskType, batchSize);
			List<Task> taskList = new ArrayList<>();
			for (String packageId : list) {
				List<String> taskIdList = asyncTaskService.getTaskIdList(packageId);
				TaskPackage taskPackage = new TaskPackage(packageId, taskType, asyncTaskService);
				for (String taskId : taskIdList) {
					Map<String, String> data = asyncTaskService.getTaskDataList(taskId);
					Task task = new AsyncTask(taskId, taskType, asyncTaskService, data, taskHandle);
					taskPackage.addTask(task);
				}
				taskList.add(taskPackage);
			}
			return taskList;
		} catch (Exception e) {
			log.error(e.getMessage(), e);
		}
		return new ArrayList<>();
	}

	public void setBatchSize(int batchSize) {
		this.batchSize = batchSize;

	}

	public void setRetryPolicy(RetryPolicy retryPolicy) {
		this.retryPolicy = retryPolicy;
	}

	public void setRushHour(Map<?, ?> rushHour) {
		this.rushHour = rushHour;
	}

	public void setRandomInterval(boolean randomInterval) {
		this.randomInterval = randomInterval;

	}

	public void setInterval(int interval) {
		this.interval = interval;

	}

	public void setRunshHourInterval(int rushHourInterval) {
		this.rushHourInterval=rushHourInterval;
		
	}

	
}
